package com.easy.mq.factory;

import java.lang.annotation.Annotation;
import java.util.Map;

import org.apache.commons.collections.MapUtils;

import com.easy.mq.analysis.AE;
import com.easy.mq.annotation.KafkaListener;
import com.easy.mq.cache.ICache;
import com.easy.mq.config.consumer.AbstractConsumerConfig;
import com.easy.mq.config.consumer.KafkaConsumerConfig;
import com.easy.mq.entry.SpringContextUtil;

public class KafkaConfigFactory extends AbstractMQConfigFactory<KafkaConsumerConfig> {
	
	public KafkaConfigFactory(Map<String,Object> config) {
		super(config);
	}
	
	@Override
	public AbstractConsumerConfig getConsumerConfig(Annotation annotation) {
		KafkaConsumerConfig config = new KafkaConsumerConfig();
		KafkaListener kafkaListener = (KafkaListener) annotation;
		config.setProperties(getConfig());
		if(getConfig().get("cache") != null) {
			if(MapUtils.getString(getConfig(),"cache").indexOf(".") > 0){
				try {
					config.setCache((ICache)Class.forName(MapUtils.getString(getConfig(),"cache")).newInstance());
				} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
					e.printStackTrace();
				}
			}else {
				config.setCache((ICache)SpringContextUtil.getBean(MapUtils.getString(getConfig(),"cache")));
			}
		}
		
		config.setExpire(MapUtils.getIntValue(getConfig(),"expire"));
		config.setAutoCommit(MapUtils.getBooleanValue(getConfig(),"enable.auto.commit"));
		config.setTransfer(MapUtils.getString(getConfig(),"transfer"));
		config.setTopic(AE.getValue(kafkaListener.topic(),getConfig()));
		config.setGroup(AE.getValue(kafkaListener.group(),getConfig()));
		config.setTimeout(MapUtils.getIntValue(getConfig(),"timeout"));
		return config;
		
	}
	
	@SuppressWarnings({"unchecked", "rawtypes"})
	@Override
	public Class getAnnotation() {
		return KafkaListener.class;
	}

}
